Skip to content

向量数据库入门实践

学习目标

  • 理解向量数据库的基本概念和应用场景
  • 了解主流向量数据库的特点和优劣势
  • 掌握使用Chroma、FAISS和Milvus构建向量检索系统的方法
  • 学习如何将向量数据库与大语言模型集成
  • 通过实战项目熟悉向量数据库的实际应用

向量数据库简介

向量数据库是专门为存储、管理和检索向量数据而设计的数据库系统。传统数据库针对结构化数据的存储和查询进行了优化,而向量数据库则专注于高维向量的高效索引和相似性搜索。

向量数据库与传统数据库的区别

特性传统数据库(如MySQL)向量数据库(如Milvus)
数据类型结构化数据(数字、文本、日期等)向量(高维数值数组)
查询方式精确匹配、范围查询相似度搜索(近似最近邻查询)
索引结构B-Tree、Hash等HNSW、IVF、PQ等专用向量索引
查询语言SQLAPI调用、特定查询语言
主要应用事务处理、数据分析语义搜索、推荐系统、多模态检索

向量数据库的主要功能

  1. 向量存储:保存嵌入向量及其元数据
  2. 高效索引:构建特殊索引结构,支持快速近似最近邻(ANN)搜索
  3. 相似性搜索:根据余弦相似度、欧几里得距离等度量方法查找相似向量
  4. 过滤和排序:支持基于元数据的过滤和排序
  5. 向量操作:支持向量增删改查和批量操作

主流向量数据库对比

向量数据库是专门设计用于存储、管理和检索向量数据的数据库系统。通过特殊的索引结构和相似度计算方法,可以高效地进行近似最近邻(ANN)搜索,是构建RAG(检索增强生成)系统的重要基础设施。

下表提供了主流向量数据库的对比:

名称类型主要特点核心索引适用场景集成难度
Pinecone云服务,商业完全托管,自动扩展HNSW,IVF企业应用,无运维需求
Weaviate开源+商业多模态,GraphQL接口HNSW知识图谱,多模态搜索
Milvus开源分布式架构,高扩展性HNSW,IVF,PQ大规模生产环境中高
Qdrant开源Rust开发,强大过滤HNSW复杂过滤条件应用
Chroma开源轻量级,Python优先HNSW开发原型,小型应用
FAISS开源库高性能,GPU加速多种索引大规模数据集,性能敏感
Vespa开源综合搜索平台HNSW搜索与向量混合需求
Redis开源+模块低延迟,内存存储HNSW实时应用,中小规模数据

向量索引方法简介

  • HNSW (分层可导航小世界图): 构建多层图结构实现快速导航,检索速度极快但内存消耗大,适合对查询速度要求高的场景

  • IVF (倒排文件索引): 将向量空间分割为多个聚类进行搜索,构建速度较快、内存消耗适中,但召回率略低

  • PQ (乘积量化): 通过将向量分解并量化来压缩存储空间,极大节省内存但有损精度,适合超大规模数据集

  • ANNOY: 使用随机投影树构建索引,可将索引保存到磁盘,适合需要持久化索引的场景

  • 混合索引: 结合多种索引方法的优势,如IVF+PQ兼顾速度和内存,IVF+HNSW结合粗筛和精筛效果

选型关键考量因素

选择合适的向量数据库需要考虑以下关键因素:

1. 数据规模与性能需求

  • 小型应用(百万级向量):Chroma, FAISS(单机)
  • 中型应用(千万级向量):Qdrant, Pinecone, Redis
  • 大型应用(亿级以上向量):Milvus, Vespa, Weaviate企业版

2. 部署与运维复杂度

  • 零运维需求:Pinecone等云服务
  • 轻量级部署:Chroma, Redis, FAISS
  • 自托管分布式:Milvus, Qdrant, Vespa

3. RAG系统集成便捷性

RAG系统通常需要以下流程:文档嵌入→存储→检索→与LLM集成。不同向量数据库在这一流程中的表现:

  • 最易集成:Chroma(与LangChain/LlamaIndex深度集成)
  • 简洁API:Pinecone, Qdrant
  • 复杂但灵活:Milvus, Vespa

4. 向量索引技术

向量数据库的核心是索引技术,主流索引方法包括:

  • HNSW(分层可导航小世界图):建立多层图形结构,平衡速度与准确性
  • IVF(倒排文件索引):将向量空间分割为多个聚类,提高检索效率
  • PQ(乘积量化):将高维向量分解存储,降低内存使用
  • ANNOY:使用随机超平面构建树形结构,平衡内存与速度

通过综合考虑上述因素,可以选择最适合特定应用场景的向量数据库。

现代嵌入模型

现代嵌入模型(如BERT、RoBERTa等)采用了更复杂的方法:

  1. 使用深度神经网络,而不是简单的向量查找表
  2. 考虑上下文信息,同一个词在不同语境下产生不同的向量
  3. 采用预训练+微调的方式,先在大量通用文本上训练,再在特定任务上微调
  4. 引入自监督学习任务,如掩码语言模型(预测被遮挡的词)

这些先进的嵌入模型能够更好地捕捉文本的语义信息,为向量数据库提供高质量的向量表示。

实践一:使用Chroma构建简单的文档检索系统

Chroma是一个轻量级、易于使用的向量数据库,特别适合原型开发和小型应用。它提供了简洁的Python API,可以轻松与其他工具集成。

安装Chroma

方法1:使用pip安装(最简单)

bash
pip install chromadb

方法2:使用Docker安装(推荐用于生产环境)

bash
# 拉取Chroma镜像
docker pull ghcr.io/chroma-core/chroma:latest

# 创建持久化目录
mkdir -p ~/.chroma/data

# 运行Chroma容器
docker run -d \
  -p 8000:8000 \
  -v ~/.chroma/data:/chroma/data \
  --name chroma \
  ghcr.io/chroma-core/chroma:latest

客户端连接(在Python中):

python
import chromadb
# 连接到Docker容器中的Chroma服务
client = chromadb.HttpClient(host="localhost", port=8000)

方法3:使用Docker Compose(适合多容器部署)

创建一个docker-compose.yml文件:

yaml
version: '3'
services:
  chroma:
    image: ghcr.io/chroma-core/chroma:latest
    volumes:
      - chroma_data:/chroma/data
    ports:
      - "8000:8000"
    restart: unless-stopped

volumes:
  chroma_data:

运行命令:

bash
docker-compose up -d

基本使用方法

python
import chromadb
from chromadb.utils import embedding_functions

# 创建客户端
client = chromadb.Client()

# 创建集合(可以指定嵌入函数)
# 这里使用默认的嵌入函数,实际应用中应选择合适的模型
embedding_function = embedding_functions.DefaultEmbeddingFunction()
collection = client.create_collection(
    name="documents",
    embedding_function=embedding_function
)

# 添加文档
documents = [
    "Python是一种易于学习的编程语言,广泛应用于数据分析和AI领域。",
    "向量数据库专门用于存储和检索向量数据,支持相似性搜索。",
    "大语言模型可以生成文本、回答问题,但存在知识截止日期的限制。",
    "RAG技术结合了检索和生成功能,提高了大语言模型回答的准确性和时效性。"
]

# 添加数据到集合
collection.add(
    documents=documents,
    ids=["doc1", "doc2", "doc3", "doc4"]
)

# 查询与"向量数据库"相似的文档
results = collection.query(
    query_texts=["向量数据库的应用"],
    n_results=2
)

print("查询结果:")
for i, doc_id in enumerate(results['ids'][0]):
    doc_text = results['documents'][0][i]
    print(f"文档ID: {doc_id}")
    print(f"文档内容: {doc_text}")
    print("-" * 50)

使用自定义嵌入模型

在实际应用中,选择合适的嵌入模型对检索质量至关重要。下面演示如何使用Sentence Transformers作为嵌入模型:

python
from sentence_transformers import SentenceTransformer
from chromadb.utils.embedding_functions import SentenceTransformerEmbeddingFunction

# 创建嵌入函数
embedding_function = SentenceTransformerEmbeddingFunction(
    model_name="paraphrase-multilingual-MiniLM-L12-v2"  # 多语言模型
)

# 创建集合
collection = client.create_collection(
    name="multilingual_docs",
    embedding_function=embedding_function
)

# 添加多语言文档
documents = [
    "Python是一种易于学习的编程语言,广泛应用于数据分析和AI领域。",
    "Python is a programming language that is easy to learn and widely used in data analysis and AI.",
    "向量数据库专门用于存储和检索向量数据,支持相似性搜索。",
    "Vector databases are specifically designed for storing and retrieving vector data, supporting similarity search."
]

collection.add(
    documents=documents,
    ids=["zh_doc1", "en_doc1", "zh_doc2", "en_doc2"]
)

# 跨语言查询
results = collection.query(
    query_texts=["Vector database applications"],
    n_results=2
)

print("查询结果:")
for i, doc_id in enumerate(results['ids'][0]):
    doc_text = results['documents'][0][i]
    print(f"文档ID: {doc_id}")
    print(f"文档内容: {doc_text}")
    print("-" * 50)

使用元数据和过滤

Chroma支持为每个文档添加元数据,并基于元数据进行过滤:

python
# 添加带元数据的文档
collection.add(
    documents=[
        "Python适合初学者和专业开发者使用。",
        "机器学习是人工智能的一个子领域。",
        "向量数据库可以高效存储和检索高维向量。",
        "自然语言处理技术使计算机能够理解人类语言。"
    ],
    metadatas=[
        {"category": "programming", "difficulty": "beginner"},
        {"category": "ai", "difficulty": "intermediate"},
        {"category": "database", "difficulty": "advanced"},
        {"category": "nlp", "difficulty": "intermediate"}
    ],
    ids=["doc5", "doc6", "doc7", "doc8"]
)

# 使用元数据过滤查询
results = collection.query(
    query_texts=["人工智能技术"],
    n_results=3,
    where={"difficulty": "intermediate"}  # 只返回中级难度的文档
)

print("过滤查询结果:")
for i, doc_id in enumerate(results['ids'][0]):
    doc_text = results['documents'][0][i]
    print(f"文档ID: {doc_id}")
    print(f"文档内容: {doc_text}")
    print("-" * 50)

持久化存储

默认情况下,Chroma使用内存存储,数据会在程序结束后丢失。为了持久化数据,可以使用PersistentClient:

python
persistent_client = chromadb.PersistentClient(path="./chroma_db")
persistent_collection = persistent_client.create_collection(name="persistent_docs")

# 添加数据
persistent_collection.add(
    documents=["这是一个持久化存储的文档示例"],
    ids=["persistent_doc1"]
)

# 数据会保存在./chroma_db目录中

实践二:使用FAISS构建高性能向量检索系统

FAISS (Facebook AI Similarity Search) 是一个用于高效相似性搜索和密集向量聚类的库,专为处理大规模向量集合而设计。

安装FAISS

方法1:使用pip安装

bash
# 仅CPU版本
pip install faiss-cpu

# GPU版本(需要CUDA支持)
pip install faiss-gpu

方法2:使用conda安装(推荐,特别是GPU版本)

bash
# CPU版本
conda install -c conda-forge faiss-cpu

# GPU版本
conda install -c conda-forge faiss-gpu

方法3:使用Docker(适合生产环境)

bash
# 拉取官方FAISS镜像
docker pull atreiding/faiss:latest

# 运行容器
docker run -it --rm atreiding/faiss:latest

基本使用示例

python
import numpy as np
import faiss

# 创建随机向量数据集作为示例
dimension = 128  # 向量维度
num_vectors = 1000  # 向量数量
vectors = np.random.random((num_vectors, dimension)).astype('float32')

# 创建索引(使用L2距离)
index = faiss.IndexFlatL2(dimension)

# 添加向量到索引
index.add(vectors)

# 创建查询向量
query_vector = np.random.random((1, dimension)).astype('float32')

# 执行搜索,获取最近的5个向量
k = 5
distances, indices = index.search(query_vector, k)

print("搜索结果索引:", indices)
print("距离:", distances)

高级索引类型

FAISS提供多种索引类型,适合不同规模和需求:

python
# 基础精确搜索,适合小规模数据(<1M)
index_flat = faiss.IndexFlatL2(dimension)

# 倒排文件索引,适合中等规模数据(<10M)
nlist = 100  # 聚类中心数量
index_ivf = faiss.IndexIVFFlat(index_flat, dimension, nlist)
index_ivf.train(vectors)  # IVF索引需要训练

# 乘积量化 + IVF,适合大规模数据
index_ivfpq = faiss.IndexIVFPQ(index_flat, dimension, nlist, 8, 8)
index_ivfpq.train(vectors)

# HNSW索引,高速搜索
M = 16  # 连接数
index_hnsw = faiss.IndexHNSWFlat(dimension, M)

与句向量嵌入结合使用

python
from sentence_transformers import SentenceTransformer
import faiss
import numpy as np

# 加载预训练模型
model = SentenceTransformer('paraphrase-MiniLM-L6-v2')

# 示例文本
texts = [
    "人工智能正在改变各行各业",
    "机器学习是AI的一个子领域",
    "深度学习基于神经网络",
    "自然语言处理让机器理解人类语言",
    "计算机视觉使机器能够"看见"世界"
]

# 生成嵌入
embeddings = model.encode(texts)
embeddings = np.array(embeddings).astype('float32')

# 创建索引
dimension = embeddings.shape[1]
index = faiss.IndexFlatIP(dimension)  # 内积相似度(余弦相似度需要归一化向量)

# 添加向量
index.add(embeddings)

# 查询
query = "AI技术的应用"
query_embedding = model.encode([query])[0].astype('float32').reshape(1, -1)

# 搜索
k = 2
distances, indices = index.search(query_embedding, k)

print(f"查询: '{query}'")
for i, idx in enumerate(indices[0]):
    print(f"结果 {i+1}: {texts[idx]}, 相似度: {distances[0][i]}")

实践三:使用Milvus构建分布式向量检索系统

Milvus是一个开源的向量数据库,专为大规模向量检索设计,支持分布式部署和水平扩展。

安装Milvus

方法1:使用Docker快速部署(单机版)

bash
# 创建存储目录
mkdir -p ~/milvus/data
mkdir -p ~/milvus/conf

# 下载配置文件
wget https://github.com/milvus-io/milvus/releases/download/v2.3.3/milvus-standalone-docker-compose.yml -O docker-compose.yml

# 启动Milvus
docker-compose up -d

方法2:使用Docker Compose(集群版)

bash
# 下载集群版配置
wget https://github.com/milvus-io/milvus/releases/download/v2.3.3/milvus-cluster-docker-compose.yml -O docker-compose.yml

# 启动Milvus集群
docker-compose up -d

方法3:使用Helm在Kubernetes上部署

bash
# 添加Milvus Helm仓库
helm repo add milvus https://milvus-io.github.io/milvus-helm/

# 更新仓库
helm repo update

# 安装Milvus(单机版)
helm install my-release milvus/milvus --set cluster.enabled=false

# 安装Milvus(集群版)
helm install my-release milvus/milvus --set cluster.enabled=true

验证安装

bash
# 检查容器运行状态
docker-compose ps

# 访问Milvus Web界面
# 在浏览器中打开 http://localhost:9001

安装Python客户端

bash
pip install pymilvus

基本使用方法

python
from pymilvus import connections, FieldSchema, CollectionSchema, DataType, Collection, utility
import numpy as np

# 连接到Milvus服务
connections.connect("default", host="localhost", port="19530")

# 定义集合字段
fields = [
    FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
    FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=128),
    FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=1000)
]

# 创建集合模式
schema = CollectionSchema(fields=fields, description="文档检索")

# 创建集合
collection_name = "document_collection"
if utility.has_collection(collection_name):
    collection = Collection(name=collection_name)
else:
    collection = Collection(name=collection_name, schema=schema)

# 创建索引
index_params = {
    "metric_type": "COSINE",  # 余弦相似度
    "index_type": "HNSW",     # HNSW索引
    "params": {
        "M": 16,              # HNSW图的出度参数
        "efConstruction": 200 # 构建时的搜索深度
    }
}

# 为嵌入字段创建索引
collection.create_index(field_name="embedding", index_params=index_params)

添加和检索数据

python
from sentence_transformers import SentenceTransformer

# 准备文本数据
documents = [
    "Milvus是一个专门为向量相似性搜索和AI应用设计的数据库",
    "向量数据库支持高效的相似性搜索和近似最近邻查询",
    "与传统数据库不同,向量数据库优化了对高维向量数据的管理",
    "人工智能应用中经常需要处理特征向量和嵌入表示",
    "嵌入模型将文本转换为高维向量,捕捉其语义含义",
    "大语言模型结合向量检索可以实现更精准的问答系统"
]

# 加载嵌入模型
model = SentenceTransformer('paraphrase-MiniLM-L6-v2')

# 生成嵌入
embeddings = model.encode(documents)

# 准备插入的数据
entities = [
    {"embedding": embeddings.tolist(), "text": documents}
]

# 插入数据
collection.insert(entities)

# 加载集合(使集合对搜索可见)
collection.load()

# 执行向量搜索
query = "向量数据库的应用"
query_embedding = model.encode([query])[0].tolist()

search_params = {
    "metric_type": "COSINE",
    "params": {"ef": 100}  # 搜索时的候选池大小
}

results = collection.search(
    data=[query_embedding],         # 查询向量
    anns_field="embedding",         # 搜索的字段
    param=search_params,            # 搜索参数
    limit=3,                        # 返回前3个结果
    output_fields=["text"]          # 返回的字段
)

# 显示结果
print(f"查询: '{query}'")
for i, hits in enumerate(results):
    print(f"查询结果 {i+1}:")
    for hit in hits:
        print(f"ID: {hit.id}, 距离: {hit.distance}")
        print(f"文本: {hit.entity.get('text')}")
        print("-" * 50)

使用元数据过滤

Milvus支持向量搜索与标量过滤的结合:

python
# 添加带分类的文档
documents_with_category = [
    {"text": "Python是一种流行的编程语言", "category": "programming"},
    {"text": "JavaScript用于网页交互", "category": "programming"},
    {"text": "机器学习使用算法从数据中学习", "category": "ai"},
    {"text": "深度学习是机器学习的一个子领域", "category": "ai"},
    {"text": "向量数据库优化了向量检索操作", "category": "database"},
    {"text": "传统关系型数据库使用SQL查询", "category": "database"}
]

# 修改集合模式以包含分类字段
category_field = FieldSchema(name="category", dtype=DataType.VARCHAR, max_length=100)
fields.append(category_field)

# 重新创建集合(实际应用中应考虑避免重复创建)
new_collection_name = "categorized_documents"
new_schema = CollectionSchema(fields=fields, description="带分类的文档检索")

if utility.has_collection(new_collection_name):
    utility.drop_collection(new_collection_name)
    
new_collection = Collection(name=new_collection_name, schema=new_schema)

# 为嵌入字段创建索引
new_collection.create_index(field_name="embedding", index_params=index_params)

# 生成嵌入并插入数据
for doc in documents_with_category:
    embedding = model.encode([doc["text"]])[0].tolist()
    new_collection.insert([{
        "embedding": embedding,
        "text": doc["text"],
        "category": doc["category"]
    }])

# 加载集合
new_collection.load()

# 带过滤条件的搜索(只搜索AI类别的文档)
query = "机器学习技术"
query_embedding = model.encode([query])[0].tolist()

results = new_collection.search(
    data=[query_embedding],
    anns_field="embedding",
    param=search_params,
    limit=2,
    expr="category == 'ai'",  # 表达式过滤条件
    output_fields=["text", "category"]
)

# 显示结果
print(f"带过滤的查询: '{query}' (类别: ai)")
for hits in results:
    for hit in hits:
        print(f"ID: {hit.id}, 距离: {hit.distance}")
        print(f"文本: {hit.entity.get('text')}")
        print(f"类别: {hit.entity.get('category')}")
        print("-" * 50)

实战项目:构建语义文档检索系统

下面我们将整合前面学习的内容,构建一个完整的语义文档检索系统:

python
import os
import argparse
import sys
from pathlib import Path
import numpy as np
from sentence_transformers import SentenceTransformer
import chromadb
from chromadb.utils.embedding_functions import SentenceTransformerEmbeddingFunction

def extract_text_from_file(file_path):
    """从文件中提取文本内容"""
    try:
        with open(file_path, 'r', encoding='utf-8') as f:
            return f.read()
    except Exception as e:
        print(f"处理文件 {file_path} 时出错: {e}")
        return ""

def create_document_chunks(text, chunk_size=1000, overlap=200):
    """将长文本分割为重叠的块"""
    chunks = []
    if len(text) <= chunk_size:
        chunks.append(text)
    else:
        for i in range(0, len(text), chunk_size - overlap):
            chunk = text[i:i + chunk_size]
            if len(chunk) >= 200:  # 避免太小的块
                chunks.append(chunk)
    return chunks

def build_vector_db(docs_dir, db_path):
    """从文档目录构建向量数据库"""
    # 初始化Chroma客户端
    client = chromadb.PersistentClient(path=db_path)
    
    # 使用多语言模型作为嵌入函数
    embedding_function = SentenceTransformerEmbeddingFunction(
        model_name="paraphrase-multilingual-MiniLM-L12-v2"
    )
    
    # 创建或获取集合
    if "documents" in client.list_collections():
        collection = client.get_collection(
            name="documents",
            embedding_function=embedding_function
        )
        print("使用现有集合")
    else:
        collection = client.create_collection(
            name="documents",
            embedding_function=embedding_function
        )
        print("创建新集合")
    
    # 处理文档
    documents = []
    metadatas = []
    ids = []
    doc_id = 0
    
    for root, _, files in os.walk(docs_dir):
        for filename in files:
            if filename.endswith('.txt') or filename.endswith('.md'):
                file_path = os.path.join(root, filename)
                content = extract_text_from_file(file_path)
                
                # 分块处理长文档
                chunks = create_document_chunks(content)
                
                for i, chunk in enumerate(chunks):
                    documents.append(chunk)
                    
                    rel_path = os.path.relpath(file_path, docs_dir)
                    metadatas.append({
                        "source": rel_path,
                        "filename": filename,
                        "chunk": i
                    })
                    
                    ids.append(f"doc_{doc_id}_{i}")
                
                doc_id += 1
                print(f"已处理: {file_path} -> {len(chunks)}个块")
    
    # 批量添加到集合
    if documents:
        # Chroma有批量大小限制,分批添加
        batch_size = 100
        for i in range(0, len(documents), batch_size):
            end = min(i + batch_size, len(documents))
            collection.add(
                documents=documents[i:end],
                metadatas=metadatas[i:end],
                ids=ids[i:end]
            )
        
        print(f"向量数据库构建完成! 共添加了 {len(documents)} 个文档块。")
    else:
        print("未找到可处理的文档")
    
    return collection

def search_documents(collection, query, top_k=5):
    """搜索文档"""
    results = collection.query(
        query_texts=[query],
        n_results=top_k
    )
    
    search_results = []
    if results['documents'] and results['documents'][0]:
        for i, (doc, metadata, doc_id) in enumerate(zip(
            results['documents'][0],
            results['metadatas'][0],
            results['ids'][0]
        )):
            result = {
                'content': doc,
                'metadata': metadata,
                'id': doc_id,
                'relevance': results['distances'][0][i] if 'distances' in results else None
            }
            search_results.append(result)
    
    return search_results

def display_results(results, query):
    """显示搜索结果"""
    print(f"\n找到 {len(results)} 个匹配 '{query}' 的文档块:")
    print("="*80)
    
    for i, result in enumerate(results):
        source = result['metadata'].get('source', 'Unknown')
        print(f"[{i+1}] 来源: {source} (块 {result['metadata'].get('chunk', 0)})")
        
        if result['relevance'] is not None:
            print(f"相关度: {1 - result['relevance']:.4f}")
            
        # 显示部分内容
        content = result['content']
        if len(content) > 300:
            content = content[:300] + "..."
        print(f"内容: {content}")
        print("-"*80)

def interactive_search(collection):
    """交互式搜索界面"""
    print("\n欢迎使用语义文档检索系统!")
    print("输入 'q' 退出")
    
    while True:
        query = input("\n请输入搜索关键词: ")
        
        if query.lower() == 'q':
            break
            
        results = search_documents(collection, query)
        display_results(results, query)
        
        if results:
            while True:
                detail = input("\n输入编号查看完整内容 (或'n'继续): ")
                if detail.lower() == 'n':
                    break
                    
                try:
                    idx = int(detail) - 1
                    if 0 <= idx < len(results):
                        print("\n" + "="*80)
                        print(f"来源: {results[idx]['metadata'].get('source')}")
                        print("完整内容:")
                        print(results[idx]['content'])
                        print("="*80)
                    else:
                        print("无效的编号")
                except ValueError:
                    print("请输入有效的数字或'n'")

def main():
    parser = argparse.ArgumentParser(description='语义文档检索系统')
    parser.add_argument('--docs', type=str, help='要索引的文档目录')
    parser.add_argument('--db', type=str, default='./vector_db', help='向量数据库存储路径')
    parser.add_argument('--rebuild', action='store_true', help='重建向量数据库')
    args = parser.parse_args()
    
    # 验证向量数据库是否存在
    db_exists = os.path.exists(args.db) and os.path.isdir(args.db)
    
    if not db_exists or args.rebuild:
        if not args.docs:
            print("错误: 向量数据库不存在,请提供文档目录以构建数据库")
            parser.print_help()
            sys.exit(1)
        
        # 构建向量数据库
        print(f"正在为目录 '{args.docs}' 构建向量数据库...")
        collection = build_vector_db(args.docs, args.db)
    else:
        # 打开现有向量数据库
        print(f"打开现有向量数据库: {args.db}")
        client = chromadb.PersistentClient(path=args.db)
        
        embedding_function = SentenceTransformerEmbeddingFunction(
            model_name="paraphrase-multilingual-MiniLM-L12-v2"
        )
        
        collection = client.get_collection(
            name="documents",
            embedding_function=embedding_function
        )
    
    # 启动交互式搜索
    interactive_search(collection)

if __name__ == "__main__":
    main()

使用方法

bash
# 构建向量数据库
python semantic_search.py --docs /path/to/your/documents --db ./vector_db

# 使用现有向量数据库
python semantic_search.py --db ./vector_db

# 重建向量数据库
python semantic_search.py --docs /path/to/your/documents --db ./vector_db --rebuild

小结

在本节中,我们学习了:

  1. 向量数据库的基本概念和主要功能
  2. 主流向量数据库的特点和应用场景
  3. 使用Chroma构建简单的向量检索系统
  4. 使用FAISS实现高性能向量检索
  5. 使用Milvus构建分布式向量检索系统
  6. 实现一个完整的语义文档检索应用

向量数据库是大语言模型应用的重要基础设施,特别是在构建RAG(检索增强生成)系统时发挥关键作用。选择合适的向量数据库取决于具体的应用场景、数据规模和性能需求。

思考题

  1. 在选择向量数据库时,应考虑哪些关键因素?不同应用场景下,如何权衡这些因素?
  2. 文档分块策略(块大小、重叠程度)如何影响检索效果?如何为特定应用确定最佳的分块策略?
  3. 如何评估向量检索系统的质量?除了召回率和精确度外,还有哪些指标可以考虑?
  4. 向量数据库如何与传统数据库结合,构建更复杂的应用?

在下一节中,我们将学习如何将向量数据库与大语言模型集成,构建完整的RAG系统。